Web Streams API
chunkをとばせばいい?
Queuing strategy
streamの流速を調整する設定
size: R1つを何個として数えるかを指定する
ReadableStreamController.desiredSizeと内部キューのサイズの合計
組み込みのQueuingStrategy
1 byteを1個とカウントする
R一つを1個とカウントする
R = Uint8Arrayのとき、CountQueuingStrategyはUint8Array objectを1個として数えるが、ByteLengthQueuingStrategyは実際のバイト数で数える
queueの図解わかりやすい
(ReadableStreamController.desiredSize ?? -1) <= 0のとき内部queueがいっぱいだと判定する
このときbackpressureが解除される
viewが存在する場面
Denoで試したところ、start()だとReadableByteStreamController.byobRequestが存在しない
code:queueが空になってからpushを再開する.ts
/// <reference lib="dom.asynciterable" />
import { delay } from "jsr:@std/async@^1.0.11/delay";
const highWaterMark = 128;
const readable = new ReadableStream({
type: "bytes",
async start(controller) {
while (true) {
const queuedSize = highWaterMark - (controller.desiredSize ?? 0);
if (controller.byobRequest) {
const chunk = controller.byobRequest.view!;
const buffer = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
for (let i = 0; i < chunk.byteLength; i++) {
bufferi = i + queuedSize; }
controller.byobRequest.respond(chunk.byteLength);
} else {
const chunkSize = this.autoAllocateChunkSize!;
if ((controller.desiredSize ?? 0) < chunkSize) return;
console.log("Desired size", controller.desiredSize);
const chunk = new Uint8Array(chunkSize);
for (let i = 0; i < chunk.length; i++) {
}
controller.enqueue(chunk);
}
await delay(0);
}
},
pull(controller) {
// この行を消すと、queueに少しでも空きができればpushを再開する
if ((controller.desiredSize ?? 0) < highWaterMark) return;
return this.start!(controller);
},
autoAllocateChunkSize: 16,
}, { highWaterMark });
for await (const chunk of readable) {
console.log(chunk);
await delay(1000);
}
(TransformStreamController.desiredSize ?? -1) <= 0のとき内部queueがいっぱいだと判定する
書き込みが完了するまでに受診されたchunksは、ReadableStreamの内部queueに貯まる
chunksの量がhighWaterMarkに到達すると、(backpressureを考慮したReadableStream/TransformStreamだった場合)上流へ背圧がかかる
消費速度の速いほうを基準に背圧がかかる
仕様書の作成例